[python] Refactor FullStartingScanner by introducing PartialStartingScanner class#7032
[python] Refactor FullStartingScanner by introducing PartialStartingScanner class#7032discivigour wants to merge 3 commits intoapache:masterfrom
Conversation
| if idx_of_this_subtask >= number_of_para_subtasks: | ||
| raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") | ||
| if self.start_pos_of_this_subtask is not None: | ||
| raise Exception("with_shard and with_slice cannot be used simultaneously") |
There was a problem hiding this comment.
use a suitable Exception or Value type instead of generic Exception, such as ValueError.
| split.shard_file_idx_map[file.file_name] = (0, plan_end_pos - file_begin_pos) | ||
| elif file_end_pos <= plan_start_pos or file_begin_pos >= plan_end_pos: | ||
| split.shard_file_idx_map[file.file_name] = (-1, -1) | ||
| return file_end_pos |
There was a problem hiding this comment.
define a constant for (-1, -1), which is important for other developers to understand it.
| return filtered_partitioned_files, (plan_start_pos, plan_end_pos) | ||
|
|
||
| def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): | ||
| """ |
There was a problem hiding this comment.
similar code with _data_evolution_filter_by_shard, can you refactor ?
| if is_blob and not self._is_blob_file(file.file_name): | ||
| if self._partial_read(): | ||
| partitioned_files = self._filter_by_pos(partitioned_files) | ||
|
|
There was a problem hiding this comment.
do we need sort before call _filter_by_pos?
| self.partial_read = True | ||
| self.starting_scanner = self._create_starting_scanner() | ||
| self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) | ||
| return self |
There was a problem hiding this comment.
If INCREMENTAL_BETWEEN_TIMESTAMP is configured, _create_starting_scanner() returns IncrementalStartingScanner which doesn't have with_shard(). This will cause AttributeError.
|
|
||
| def _filter_by_pos(self, files): | ||
| if self.table.is_primary_key_table: | ||
| return self._primary_key_filter_by_shard(files) |
There was a problem hiding this comment.
This may cause exception if call with_slice for pk table
Purpose
Tests
API and Format
Documentation